from keras.models import Model
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import *
from keras import backend as K
from keras.layers.advanced_activations import *
from keras import metrics
from keras.applications import *
from keras.preprocessing import image
from keras.initializers import *
from keras import losses
from keras import regularizers
from keras.preprocessing.image import load_img, save_img, img_to_array
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import cv2
import time
def denormalise_data(x):
return np.array( x*127.5 + 127.5, np.uint8)
def normalise_data(x):
return x/127.5 - 1
def generate_data(data_location):
batches = os.listdir(data_location)
for batch in batches:
data = np.load(data_location+"/"+batch)
data = normalise_data(data)
yield data
def train(model , epochs , lr , data_location , batch_size = 32):
optim = Adam(lr)
model.compile(loss = 'mse' , optimizer = optim)
losses = []
t = time.time()
for epoch in range(epochs):
for data in generate_data(data_location):
hist = model.fit(x = data , y = data , batch_size = batch_size , verbose = 1)
losses.append(hist.history['loss'])
t = time.time() - t
plt.plot(losses)
plt.show()
return losses[-1] , t
def visualize(model , layer_name ,sample, size = (10,10)):
print("Visualising :",layer_name)
sample = np.array([sample])
sample = normalise_data(sample)
x = model.get_layer(layer_name).output
inp = model.layers[0].output
out = x
test = Model(inputs = inp , outputs = out).predict(sample)
_ , h , w , c = test.shape
plt.figure(figsize = size)
it = np.ceil(np.sqrt(c))
for channel in range(c):
plt.subplot(it , it , channel+1)
a = test[0,:,:,channel]
o = np.interp(a , (a.min(), a.max()), (0, 1))
plt.imshow(o , cmap='gray')
plt.show()
print("Looking at the prediction")
res = model.predict(sample)
plt.figure(figsize = (10,10))
plt.subplot(1 , 2 , 1)
plt.imshow(denormalise_data(sample[0]))
plt.subplot(1 , 2 , 2)
plt.imshow(denormalise_data(res[0]))
data_location = '../../Dataset/ms_coco_npy'
sample = np.load(data_location+"/"+"batch_1.npy")
plt.imshow(sample[40])
plt.show()
PS: I did missed the tanh at the end since my norms are by (-1 , 1) , and non tanh layer would not enforce the range so few values can be off and also it effects the amount of time to minimize loss. So put in the tanh. But again in such simplicity even without tanh it will work
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=16 , kernel_size=(3,3) , activation=None, padding='same' , name='layer1')(inp)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer1)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model ,1 , 0.003 ,data_location)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
visualize(model , 'layer1' , sample[100])
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=128 , kernel_size=(3,3) , activation=None, padding='same' , name='layer1')(inp)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer1)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model ,1 , 0.003 ,data_location , batch_size= 16)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
visualize(model , 'layer1' , sample[100] , size = (30,30))
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=32 , kernel_size=(3,3) , activation=None, padding='same' , name='layer1')(inp)
layer2 = Conv2D(filters=128 , kernel_size=(3,3) , activation=None, padding='same' , name='layer2')(layer1)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer2)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model ,1 , 0.003 ,data_location , batch_size= 16)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
print("################## LAYER 1 #########################")
visualize(model , 'layer1' , sample[100] , size = (30 , 30))
print("################## LAYER 2 #########################")
visualize(model , 'layer2' , sample[100] , size = (30 , 30))
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=32 , kernel_size=(3,3) , activation=None, padding='same' , name='layer1')(inp)
layer2 = Conv2D(filters=64 , kernel_size=(3,3) , activation=None, padding='same' , name='layer2')(layer1)
layer3 = Conv2D(filters=32 , kernel_size=(3,3) , activation=None, padding='same' , name='layer3')(layer2)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer3)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model ,1 , 0.003 ,data_location , batch_size= 32)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
visualize(model , 'layer2' , sample[100] , size = (30 , 30))
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=32 , kernel_size=(3,3) , activation=None, padding='same' , name='layer1')(inp)
layer2 = Conv2D(filters=64 , kernel_size=(3,3) , strides=(2,2), activation=None, padding='same' , name='layer2')(layer1)
layer3 = Conv2DTranspose(filters=32 , kernel_size=(3,3) , strides=(2,2), activation=None, padding='same' , name='layer3')(layer2)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer3)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model ,1 , 0.003 ,data_location , batch_size= 32)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
print("looking at the Convolution Bottlneck")
visualize(model , 'layer2' , sample[100] , size = (30 , 30))
print("looking at the Deconvolution block")
visualize(model , 'layer3' , sample[100] , size = (30 , 30))
visualize(model , 'output' , sample[100] , size = (10 , 10)) ## Looking at the output layer
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=32 , kernel_size=(3,3) , strides = (2,2) , activation='relu', padding='same' , name='layer1' , )(inp)
layer2 = Conv2D(filters=64 , kernel_size=(3,3) , strides=(2,2), activation='relu', padding='same' , name='layer2' , )(layer1)
layer3 = Conv2D(filters=128 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name='layer3' , )(layer2)
layer4 = Conv2D(filters = 256 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name='layer4' , )(layer3)
layer5 = Conv2D(filters = 512 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name = 'layer5' , )(layer4)
layer6 = Conv2D(filters = 1024 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name = 'layer6' , )(layer5)
layer7 = AveragePooling2D()(layer6) ## 4096 feature representation
## The layer 7 can now be used for any kind of visualization, The official docs of Keras mentions using t-SNE.
## We have also avoid the sparsity constraints , which will avoid all the nodes giving high output i.e feature representation
## will be more robust
############################# Decoder network ################################
layer8 = Conv2DTranspose(filters = 1024 , kernel_size =(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer8' , )(layer7)
layer8 = Concatenate()([layer8 , layer6])
layer9 = Conv2DTranspose(filters = 512 , kernel_size =(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer9' , )(layer8)
layer9 = Concatenate()([layer9 , layer5])
layer10 = Conv2DTranspose(filters = 256 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer10' , )(layer9)
layer10 = Concatenate()([layer10 , layer4])
layer11 = Conv2DTranspose(filters = 128 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer11' , )(layer10)
layer11 = Concatenate()([layer11 , layer3])
layer12 = Conv2DTranspose(filters = 64 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer12' , )(layer11)
layer12 = Concatenate()([layer12 , layer2])
layer13 = Conv2DTranspose(filters = 32 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer13' , )(layer12)
layer14 = Conv2DTranspose(filters = 32 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer14' , )(layer13)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer14)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model , 1 , 0.003 ,data_location , batch_size= 4)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
## Output when using 32 ==> 64 ==> 32
visualize(model , 'layer13' , sample[100] , size = (30 , 30))
## You can look at how the original data is reconstruced. With each filter trying to recreate what th early layers captures.
## Output when using 32 ==> 64 ==> 128 ==> 128 ==> 64 ==> 32
visualize(model , 'layer12' , sample[100] , size = (30 , 30))
## You can look at how the original data is reconstruced. With each filter trying to recreate what th early layers captures.
## Full architecture
visualize(model , 'layer11' , sample[100] , size = (30 , 30))
visualize(model , 'layer12' , sample[100] , size = (30 , 30))
visualize(model , 'layer13' , sample[100] , size = (30 , 30))
## You can look at how the original data is reconstruced. With each filter trying to recreate what th early layers captures.
##
inp = Input(shape = (256 , 256 , 3))
layer1 = Conv2D(filters=32 , kernel_size=(3,3) , strides = (2,2) , activation='relu', padding='same' , name='layer1' , )(inp)
layer2 = Conv2D(filters=64 , kernel_size=(3,3) , strides=(2,2), activation='relu', padding='same' , name='layer2' , )(layer1)
layer3 = Conv2D(filters=128 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name='layer3' , )(layer2)
layer4 = Conv2D(filters = 256 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name='layer4' , )(layer3)
layer5 = Conv2D(filters = 512 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name = 'layer5' , )(layer4)
layer6 = Conv2D(filters = 1024 , kernel_size=(3,3) , strides=(2,2) , activation='relu' , padding='same' , name = 'layer6' , )(layer5)
layer7 = AveragePooling2D()(layer6) ## 4096 feature representation
## The layer 7 can now be used for any kind of visualization, The official docs of Keras mentions using t-SNE.
## We have also avoid the sparsity constraints , which will avoid all the nodes giving high output i.e feature representation
## will be more robust
############################# Decoder network ################################
layer8 = Conv2DTranspose(filters = 1024 , kernel_size =(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer8' , )(layer7)
layer8 = Concatenate()([layer8 , layer6])
layer9 = Conv2DTranspose(filters = 512 , kernel_size =(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer9' , )(layer8)
layer9 = Concatenate()([layer9 , layer5])
layer10 = Conv2DTranspose(filters = 256 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer10' , )(layer9)
layer10 = Concatenate()([layer10 , layer4])
layer11 = Conv2DTranspose(filters = 128 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer11' , )(layer10)
# layer11 = Concatenate()([layer11 , layer3])
layer12 = Conv2DTranspose(filters = 64 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer12' , )(layer11)
# layer12 = Concatenate()([layer12 , layer2])
layer13 = Conv2DTranspose(filters = 32 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer13' , )(layer12)
layer14 = Conv2DTranspose(filters = 32 , kernel_size=(3,3) , strides = (2,2) , activation='relu' , padding='same' , name='layer14' , )(layer13)
out = Conv2D(filters = 3 , kernel_size=(3,3) , activation='tanh' , padding='same' , name = 'output')(layer14)
model = Model(inputs = inp , outputs = out)
model.summary()
loss , t = train(model , 5 , 0.003 ,data_location , batch_size= 4)
print('loss : {} time for 1 epoch : {}'.format(loss, t))
visualize(model , 'layer11' , sample[100] , size = (30 , 30))
def noisy(noise_typ,image):
if noise_typ == "gauss":
row,col,ch= image.shape
mean = 0
var = 0.1
sigma = var**0.5
gauss = np.random.normal(mean,sigma,(row,col,ch))
gauss = gauss.reshape(row,col,ch)
noisy = image + gauss
return noisy
elif noise_typ == "s&p":
row,col,ch = image.shape
s_vs_p = 0.5
amount = 0.004 ### Change this and experiment
out = np.copy(image)
# Salt mode
num_salt = np.ceil(amount * image.size * s_vs_p)
coords = [np.random.randint(0, i - 1, int(num_salt))
for i in image.shape]
out[coords] = 1
# Pepper mode
num_pepper = np.ceil(amount* image.size * (1. - s_vs_p))
coords = [np.random.randint(0, i - 1, int(num_pepper))
for i in image.shape]
out[coords] = 0
return out
elif noise_typ == "poisson":
vals = len(np.unique(image))
vals = 2 ** np.ceil(np.log2(vals))
noisy = np.random.poisson(image * vals) / float(vals)
return noisy
elif noise_typ =="speckle":
row,col,ch = image.shape
gauss = np.random.randn(row,col,ch)
gauss = gauss.reshape(row,col,ch)
noisy = image + image * gauss
return noisy
im = sample[100]
n = noisy('s&p' , im)
plt.imshow(n)
plt.show()
model.save("deep_autoencoder.h5")
from keras.models import load_model
model = load_model("deep_autoencoder.h5")
im = model.predict(normalise_data(np.array([n])))
plt.imshow(denormalise_data(im[0]))
plt.show()